{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Python 机器学习实战 ——代码样例\n",
    "\n",
    "# 第八章  Python 多线程和多进程浅析\n",
    "\n",
    "#### 多线程引言\n",
    "\n",
    "多线程处理，是 python 乃至很多编程语言中比较复杂的概念，随着 cpu 的多核心、计算速度越来越快、各类网络应用等的出现，对于多个线程的运用，可以有效的提高程序的处理性能和速度。\n",
    "\n",
    "有很多讨论 python 线程、进程和协程的书和资料，有的概念说的不太清楚，有的例子举得太复杂，因此在研究和实践之后，斗胆也讨论一下这个略有复杂的话题。希望不要误人子弟。\n",
    "\n",
    "\n",
    "#### 线程\n",
    "\n",
    "线程的标准定义如下：\n",
    "\n",
    "线程（thread）是操作系统能够进行运算调度的最小单位。它被包含在进程之中，是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流，一个进程中可以并发多个线程，每条线程并行执行不同的任务。\n",
    "\n",
    "在多核或多CPU，或支持 Hyper-threading 的CPU上使用多线程程序设计的好处是显而易见，即提高了程序的执行吞吐率。在单 CPU 单核的计算机上，使用多线程技术，也可以把进程中负责 IO 处理、人机交互而常被阻塞的部分与密集计算的部分分开来执行，编写专门的 workhorse 线程执行密集计算，从而提高了程序的执行效率。\n",
    "\n",
    "\n",
    "#### Python 是解释型语言\n",
    "\n",
    "像 C/C++这样的语言是编译型语言，程序输入到编译器，编译器再根据语言的语法进行解析，然后翻译成语言独立的中间表示，最终链接成具有高度优化的机器码的可执行程序。编译器之所以可以深层次的对代码进行优化，是因为它可以看到整个程序（或者一大块独立的部分）。这使得它可以对不同的语言指令之间的交互进行推理，从而给出更有效的优化手段。\n",
    "\n",
    "python 程序的执行是解释型的，检查语法、翻译成中间状态等也会做，但是不会把整个程序翻译成机器码，可以理解为一行行去执行代码。目前的全栈语言 Javascript 以及非常适合开发网站的 Php 都是解释型语言。 \n",
    "\n",
    "现在的 CPU 4核、8核都是常规了，要想利用多核系统，Python必须支持多线程运行。作为解释型语言，Python的解释器必须做到既安全又高效。多线程编程会遇到的问题是解释器要避免在不同的线程操作内部共享的数据。同时它还要保证在管理用户线程时保证总是有最大化的计算资源。\n",
    "\n",
    "\n",
    "#### Python 线程切换机制\n",
    "\n",
    "python 支持多线程，所以它就有两种模式，一种是协作多任务(cooperative multitasking)，另一种是抢占式多任务(preemptive multitasking)。\n",
    "\n",
    "python 的协作多任务机制是当一个线程开始 sleep 或者进行 I/O 操作时，另一个线程就有机会拿到GIL锁，开始执行它的代码。\n",
    "python 的抢占式多任务机制是每隔 15ms 进行监测，尝试收回 GIL。\n",
    "\n",
    "由于多线程执行时，存在线程的切换，当多个线程同时运行时，保证运行结果符合预期，就是线程安全的。\n",
    "\n",
    "和操作系统进行进程调度类似，当进程执行一段时间之后，发生时钟中断，操作系统响应时钟中断，并在这是进行进程度调度。而 python 中也是通过软件模拟了这种始终中断，来激活线程调度。\n",
    "\n",
    "下面是一个重要的概念，关于线程安全，在说这个之前，先来看看着例子："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1373573\n"
     ]
    }
   ],
   "source": [
    "import threading\n",
    "\n",
    "count = 0\n",
    "\n",
    "def run():\n",
    "    global count\n",
    "    for i in range(1000000):\n",
    "        count = count + 1\n",
    "\n",
    "t1 = threading.Thread(target=run)\n",
    "t2 = threading.Thread(target=run)\n",
    "t1.start()\n",
    "t2.start()\n",
    "t1.join()\n",
    "t2.join()\n",
    "\n",
    "print(count)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "两个线程轮流执行一个加法程序，感觉答案应该是2000000，可以，你会发现每次都小于2000000。这是为什么呢？\n",
    "\n",
    "比如在 count 是 20 的时候，线程 t1 读取了 count，t1 读到的是 20，这时候 cpu 将控制权给了另一个线程 t2。 \n",
    "t2线程读到的 count 也是 20，然后 t2 加1，写回21。线程回到 t1的时候，t1 将前面读到的20也加1，还是21写回。 \n",
    "好了，本来应该连个线程各加1次，等于22的，现在成了21。\n",
    "\n",
    "所以说在这个例子里，只要 cpu 从线程拿走控制权的时候正好是在读完值的时候，就会发生这样的情况。这就是多线程下对全局变量的写操作不是线程安全的现象和原因。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Python 线程安全\n",
    "\n",
    "因为线程被切换时候，线程的写操作会被中断，所以我们要考虑线程安全这个问题，否则多线程的程序的运行结果就会出错。\n",
    "\n",
    "1 天生线程安全\n",
    "\n",
    "天生线程安全，就是线程代码中只对全局对象进行读操作，而不存在写操作。这种情况下，不论线程在何处中断，都不会影响各个线程本来的执行逻辑。\n",
    "\n",
    "\n",
    "2 实现原子操作\n",
    "\n",
    "在一个线程中，有时需要保证某一行或者某一段代码的逻辑是不可中断的，也就是说要保证这段代码执行的原子性。\n",
    "\n",
    "python 内建的数据类型（list，dict等）的共享变量进行操作，就是原子操作。\n",
    "\n",
    "比如下面这些操作都是原子的，不用担心多线程切换时候的问题\n",
    "\n",
    "* list.append(x)\n",
    "* list1.extend(list2)\n",
    "* x = list[i]\n",
    "* x = list.pop()\n",
    "* list.sort()\n",
    "* x = y\n",
    "\n",
    "\n",
    "3 执行代码的前后加互斥锁。\n",
    "\n",
    "我们修改一下刚才的两个进程的加法例子：\n",
    "\n",
    "最简单的办法就是引入 threading 模块中的 Lock()，然后在 count 计算这里前面加上锁，后面加上释放。\n",
    "\n",
    "lock.acquire()  \n",
    "... 访问可能共享的资源\n",
    "lock.release()\n",
    "\n",
    "举例如下："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2000000\n"
     ]
    }
   ],
   "source": [
    "import threading\n",
    "\n",
    "lock = threading.Lock()\n",
    "\n",
    "count = 0\n",
    "\n",
    "def run():\n",
    "    global count\n",
    "    for i in range(1000000):\n",
    "        # 加锁\n",
    "        lock.acquire()\n",
    "        count += 1\n",
    "        lock.release()\n",
    "\n",
    "t1 = threading.Thread(target=run)\n",
    "t2 = threading.Thread(target=run)\n",
    "t1.start()\n",
    "t2.start()\n",
    "t1.join()\n",
    "t2.join()\n",
    "\n",
    "print(count)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "你可以使用with语句。在使用锁的时候，with语句会在进入语句块之前自动的获取到该锁对象，然后在语句块执行完成后自动释放掉锁。如同在打开文件时候的 with 语句一样，这样比较简洁也安全。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2000000\n"
     ]
    }
   ],
   "source": [
    "import threading\n",
    "\n",
    "lock = threading.Lock()\n",
    "\n",
    "count = 0\n",
    "\n",
    "def run():\n",
    "    global count\n",
    "    for i in range(1000000):\n",
    "        # 使用 with 来进行加锁\n",
    "        with lock:\n",
    "            count += 1\n",
    "    \n",
    "t1 = threading.Thread(target=run)\n",
    "t2 = threading.Thread(target=run)\n",
    "t1.start()\n",
    "t2.start()\n",
    "t1.join()\n",
    "t2.join()\n",
    "\n",
    "print(count)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "锁的操作还是略复杂的，除了简单的直接锁以外，还有RLock，简单锁即便是线程本身也会发生阻塞，RLock 只有在其他线程访问时才会发生阻塞。\n",
    "\n",
    "Semaphores，信号量，信号量是一个更高级的锁机制。信号量内部有一个计数器而不像锁对象内部有锁标识，而且只有当占用信号量的线程数超过信号量时线程才阻塞。这允许了多个线程可以同时访问相同的代码区。\n",
    "\n",
    "当信号量被获取的时候，计数器减小；当信号量被释放的时候，计数器增大。当获取信号量的时候，如果计数器值为0，则该进程将阻塞。当某一信号量被释放，counter值增加为1时，被阻塞的线程（如果有的话）中会有一个得以继续运行。\n",
    "\n",
    "信号量通常被用来限制对容量有限的资源的访问，比如一个网络连接或者数据库服务器。在这类场景中，只需要将计数器初始化为最大值，信号量的实现将为你完成剩下的事情。\n",
    "\n",
    "用 Semaphores 可以实现类似线程池的功能。当然我们其实有更简单的办法来实现线程池，后面会说到。\n",
    "\n",
    "Semaphores 实现线程池的例子可以参考：http://www.bogotobogo.com/python/Multithread/python_multithreading_Synchronization_Semaphore_Objects_Thread_Pool.php\n",
    "\n",
    "\n",
    "\n",
    "3 实现线程同步\n",
    "\n",
    "线程同步是在锁的基础来实现的。通过锁来对各个线程的执行顺序进行控制。一个线程需要等待其它线程完成特定任务之后，才能执行。多个线程之间有依赖关系。比如抓取网站数据，然后分析处理，然后写入数据库，就可以通过线程同步来实现。\n",
    "\n",
    "\n",
    "#### Python 多线程 Step by Step\n",
    "\n",
    "Python 在 CPU 密集运算的场景，多个线程并不能提高太多性能，而对于 I/O 阻塞的场景，可以使得运行效率获得几倍的提高。我们接下来会详细的分析一下。\n",
    "\n",
    "我们先做一个可以用来测试的基准程序，这是一个比较无聊的计算程序，可以理解为是一个 CPU 密集型的测试。当然你也可以替换为找最大公约数、求质数或者读者自己的计算程序。\n",
    "\n",
    "在写这部分内容的时候，我的代码是在 Jupyter 中执行的，这是一台 2012 年秋天款式的 Mac mini，2.3GHz Intel Core i7, 一个处理器 4 个核心，16 G 1600 MHz DDR3 内存。\n",
    "\n",
    "计算测试时间，我们这里就简单的取一次值，对于 CPU 密集运算的情况来说，除非电脑有更加消耗资源的应用，一般差异不大，但是和其他比如磁盘、网络等相关的应用就不一定了。\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "499999500000\n",
      "cost time 0.106146 s\n"
     ]
    }
   ],
   "source": [
    "# 计算一次 my_cal() 函数，获得耗用时间\n",
    "\n",
    "from time import time\n",
    "\n",
    "def my_cal(a):\n",
    "    j = 0\n",
    "    for i in range(a):\n",
    "        j = j + i\n",
    "    print(j)\n",
    "    return j\n",
    "\n",
    "start = time()\n",
    "my_cal(1000000)\n",
    "end = time()\n",
    "print('cost time {:f} s'.format(end - start))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "真实场景都要复杂得多，我们为了后面的例子，用一个 list 来提供数据源，使用上面的累加函数来计算 5 次。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "499999500000\n",
      "1124999250000\n",
      "1999999000000\n",
      "3124998750000\n",
      "4499998500000\n",
      "cost time 0.906054 s\n"
     ]
    }
   ],
   "source": [
    "# 计算五次 my_cal() 函数，获得整体耗用时间\n",
    "\n",
    "from time import time\n",
    "\n",
    "def my_cal(a):\n",
    "    j = 0\n",
    "    for i in range(a):\n",
    "        j = j + i\n",
    "    print(j)\n",
    "    return j\n",
    "\n",
    "start = time()\n",
    "\n",
    "list_01 = [1000000, 1500000, 2000000, 2500000, 3000000]\n",
    "\n",
    "for i in list_01:\n",
    "    my_cal(i)\n",
    "    \n",
    "end = time()\n",
    "print('cost time {:f} s'.format(end - start))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "当然我们可以写的更加简洁一些，用 map() 函数是最合适在这样的场景了。所以读者如果要测试的话，可以用上面循环的方式，也可以用下面 map() 的方式，看具体情况。整体上来说推荐 python 风格的写法，能用 map() 的尽量就不要用 for 循环了。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "499999500000\n",
      "1124999250000\n",
      "1999999000000\n",
      "3124998750000\n",
      "4499998500000\n",
      "cost time 0.927783 s\n"
     ]
    }
   ],
   "source": [
    "from time import time\n",
    "\n",
    "def my_cal(a):\n",
    "    j = 0\n",
    "    for i in range(a):\n",
    "        j = j + i\n",
    "    print(j)\n",
    "    return j\n",
    "\n",
    "start = time()\n",
    "\n",
    "list_01 = [1000000, 1500000, 2000000, 2500000, 3000000]\n",
    "\n",
    "list_02 = list(map(my_cal, list_01))\n",
    "        \n",
    "end = time()\n",
    "print('cost time {:f} s'.format(end - start))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "我们先用简单的多线程的方式，来看看有没有提速的作用。\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "499999500000\n",
      "1124999250000\n",
      "1999999000000\n",
      "3124998750000\n",
      "4499998500000\n",
      "\n",
      "cost time 1.074110 s\n"
     ]
    }
   ],
   "source": [
    "from time import time\n",
    "import threading\n",
    "\n",
    "def my_cal(a):\n",
    "    j = 0\n",
    "    for i in range(a):\n",
    "        j = j + i\n",
    "    print(j)\n",
    "    return j\n",
    "\n",
    "start = time()\n",
    "\n",
    "list_01 = [1000000, 1500000, 2000000, 2500000, 3000000]\n",
    "\n",
    "Threads = []\n",
    "\n",
    "for i in list_01:\n",
    "    # 建立线程\n",
    "    thread = threading.Thread(target=my_cal,args=(i,))\n",
    "    thread.start()\n",
    "    # 建立线程列表\n",
    "    Threads.append(thread)\n",
    "\n",
    "# 等待线程结束\n",
    "for thread in Threads:\n",
    "    thread.join()\n",
    "       \n",
    "end = time()\n",
    "print('\\ncost time {:f} s'.format(end - start))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "上面这样写，可以执行多个线程。但是如前面所言，因为实际上还是在不同的线程中轮流，对于 CPU 密集型的例子来说，多线程对性能提升是没有什么用处，还让管理线程占用多一点的资源。\n",
    "\n",
    "注意，创建的 thread 只能执行一次 start() 方法，否则会报错: RuntimeError: threads can only be started once\n",
    "\n",
    "thread 的 join() 方法阻塞当前线程，等待子线程执行完毕后自己再继续执行。如果没有 join() 的阻塞，我们会先看到 cost time 的计算，而实际上上面的线程还没有完成自己的任务。\n",
    "\n",
    "多线程多 join 的情况下，依次执行各线程的join方法，前面一个结束了才能执行后面一个。\n",
    "\n",
    "注意：thread 模块在 Python 3 中已被废弃，用 threading 模块代替。在 Python3 中不能再使用\"thread\" 模块。为了兼容性，Python3 将 thread 重命名为 \"_thread\"。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "<bound method Thread.is_alive of <Thread(Thread-43, initial)>>\n",
      "<bound method Thread.is_alive of <Thread(Thread-43, started 123145451360256)>>\n",
      "4499998500000\n"
     ]
    }
   ],
   "source": [
    "# 检查线程的状态\n",
    "from time import time\n",
    "import threading\n",
    "\n",
    "def my_cal(a):\n",
    "    j = 0\n",
    "    for i in range(a):\n",
    "        j = j + i\n",
    "    print(j)\n",
    "    return j\n",
    "\n",
    "thread = threading.Thread(target=my_cal,args=(i,))\n",
    "# 创建后的线程的状态\n",
    "print(thread.isAlive)\n",
    "thread.start()\n",
    "# 线程启动后的状态\n",
    "print(thread.isAlive)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 多进程方式\n",
    "\n",
    "对于 CPU 运算密集的场景，我们换做多进程的方式来看一下。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 61,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "499999500000\n",
      "1124999250000\n",
      "1999999000000\n",
      "3124998750000\n",
      "4499998500000\n",
      "cost time 0.374396 s\n"
     ]
    }
   ],
   "source": [
    "# 多进程方式\n",
    "from time import time\n",
    "from concurrent.futures import *\n",
    "\n",
    "def my_cal(a):\n",
    "    j = 0\n",
    "    for i in range(a):\n",
    "        j = j + i\n",
    "    print(j)\n",
    "    return j\n",
    "\n",
    "list_01 = [1000000, 2000000, 1500000, 2500000, 3000000]\n",
    "\n",
    "start = time()\n",
    "\n",
    "pool = ProcessPoolExecutor(max_workers=10)\n",
    "\n",
    "list_02 = list(pool.map(my_cal, list_01))\n",
    "        \n",
    "end = time()\n",
    "print('cost time {:f} s'.format(end - start))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "我们可以看到速度提升了60%以上。我觉得这是提高性能最好和最简单的方法之一。\n",
    "\n",
    "设置多少个 worker，一般是等于 cpu 核心数或者乘以二，服务器 Gunicorn worker 的数量从经验的角度一般配置 2 * core + 1, core指的核心数。\n",
    "\n",
    "修改一下程序，用 timeit 来进行测试。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": false,
    "scrolled": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2\n",
      "6 loops, best of 3: 496 ms per loop\n",
      "3\n",
      "6 loops, best of 3: 436 ms per loop\n",
      "4\n",
      "6 loops, best of 3: 397 ms per loop\n",
      "5\n",
      "6 loops, best of 3: 327 ms per loop\n",
      "6\n",
      "6 loops, best of 3: 335 ms per loop\n",
      "7\n",
      "6 loops, best of 3: 350 ms per loop\n",
      "8\n",
      "6 loops, best of 3: 350 ms per loop\n",
      "9\n",
      "6 loops, best of 3: 342 ms per loop\n",
      "10\n",
      "6 loops, best of 3: 340 ms per loop\n"
     ]
    }
   ],
   "source": [
    "# 多进程方式，测试 workers\n",
    "\n",
    "from concurrent.futures import *\n",
    "\n",
    "def my_cal(a):\n",
    "    j = 0\n",
    "    for i in range(a):\n",
    "        j = j + i\n",
    "    # print(j)\n",
    "    return j\n",
    "\n",
    "def my_cal_main(workers):\n",
    "\n",
    "    list_01 = [1000000, 2000000, 1500000, 2500000, 3000000]\n",
    "\n",
    "    pool = ProcessPoolExecutor(workers)\n",
    "\n",
    "    list_02 = list(pool.map(my_cal, list_01))\n",
    "\n",
    "for i in range(2,11):\n",
    "    print(i)\n",
    "    %timeit -n6 my_cal_main(i)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "我们可以看到，worker 设为 6 的时候速度最快，再往上没有明显差别。这个还和应用场景有关，所以如果不想太复杂的化，设为 cpu 核心数比较适合。\n",
    "\n",
    "下面的有三种方法可以获得 cpu 核心的数量："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 50,
   "metadata": {
    "collapsed": false,
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "8\n",
      "8\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "8"
      ]
     },
     "execution_count": 50,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# 获得 cpu 核心数量\n",
    "import multiprocessing\n",
    "\n",
    "pool = multiprocessing.Pool()\n",
    "\n",
    "print(pool._processes)\n",
    "\n",
    "print(multiprocessing.cpu_count())\n",
    "\n",
    "import psutil\n",
    "psutil.cpu_count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "前面说过我测试的电脑用的是 i7 cpu，是4核心，为什么这里显示的是8呢，如果搜索一下 intel 官方网站，可以看到，这块 i7 cpu 的确是4核，但是支持 8 线程，或许是这个原因吧。\n",
    "\n",
    "\n",
    "#### 基于 I/O 的多线程\n",
    "\n",
    "多线程的例子中比较多的就是抓取网页，因为抓取网页是典型的 I/O 开销，因此 python 的多线程终于不显得那么鸡肋了。\n",
    "\n",
    "我们把上面例子中的计算函数修改为抓取网站的大小。先用最标准的方式，不用线程什么的，顺序的进行业务逻辑处理。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 77,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "http://www.qq.com 246846\n",
      "http://chuangyiji.com 84537\n",
      "http://taobao.com 123926\n",
      "http://mingrihui.com 43480\n",
      "\n",
      "cost time 11.283091 s\n"
     ]
    }
   ],
   "source": [
    "# 多线程方式抓取网页大小\n",
    "from time import time\n",
    "import requests\n",
    "\n",
    "list_url = ['http://www.qq.com', 'http://chuangyiji.com', 'http://taobao.com', 'http://mingrihui.com']\n",
    "\n",
    "def get_url_size(url):\n",
    "    rq = requests.get(url)\n",
    "    length = len(rq.content)\n",
    "    print( url, length)\n",
    "    return length\n",
    "\n",
    "start = time()\n",
    "\n",
    "for url in list_url:\n",
    "   get_url_size(url)\n",
    "       \n",
    "end = time()\n",
    "print('\\ncost time {:f} s'.format(end - start))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "我在里面故意放了两个自己的网站（你没见过的那两个域名就是），一个网站是在国外，一个网站在国内的云主机上，相对访问速度比较慢，因此在执行程序的时候有时候会有明显的等待。对四个网站处理完，差不多要20秒左右或者更多。大家可以看到结果呈现的顺序是和列表中一样的。这是一个单线程的例子。\n",
    "\n",
    "然后我们修改为上面多线程的模式，程序逻辑几乎一模一样。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 82,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "http://www.qq.com 246836\n",
      "http://mingrihui.com 43480\n",
      "http://taobao.com 123926\n",
      "http://chuangyiji.com 84537\n",
      "\n",
      "cost time 5.828597 s\n"
     ]
    }
   ],
   "source": [
    "# 多线程方式执行网站大小抓取\n",
    "\n",
    "from time import time\n",
    "import requests\n",
    "import threading\n",
    "\n",
    "list_url = ['http://www.qq.com', 'http://chuangyiji.com', 'http://taobao.com', 'http://mingrihui.com']\n",
    "\n",
    "def get_url_size(url):\n",
    "    rq = requests.get(url)\n",
    "    length = len(rq.content)\n",
    "    print( url, length)\n",
    "    return length\n",
    "\n",
    "start = time()\n",
    "\n",
    "Threads = []\n",
    "\n",
    "for url in list_url:\n",
    "    thread = threading.Thread(target=get_url_size, args=(url,))\n",
    "    thread.start()\n",
    "    Threads.append(thread)\n",
    "    \n",
    "for thread in Threads:\n",
    "    thread.join()\n",
    "       \n",
    "end = time()\n",
    "print('\\ncost time {:f} s'.format(end - start))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "可以看到总的来说执行速度快了很多，并且通过显示的网页大小结果，我们会发现，和上面的顺序不一定一样(我自己测试了很多次都不一样)，qq 和 淘宝都很快，在国内云上第三，在国外的网站最后，基本都是这个顺序。\n",
    "\n",
    "毋庸多言，这就是多线程比较适用在非堵塞业务场景的证明。\n",
    "\n",
    "python 3.2 开始新增了 concurrent.futures 模块，提供了一种优雅的方式来完成多线程或者多进程的并发实现，我们先“野蛮”一点，使用多进程方式来实现这个功能。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 83,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "http://www.qq.com 246793\n",
      "http://mingrihui.com 43480\n",
      "http://chuangyiji.com 84537\n",
      "http://taobao.com 123918\n",
      "\n",
      "cost time 8.208078 s\n"
     ]
    }
   ],
   "source": [
    "# 多进程方式执行网站大小抓取\n",
    "\n",
    "from time import time\n",
    "import requests\n",
    "import concurrent.futures\n",
    "\n",
    "\n",
    "list_url = ['http://www.qq.com', 'http://chuangyiji.com', 'http://taobao.com', 'http://mingrihui.com']\n",
    "\n",
    "def get_url_size(url):\n",
    "    rq = requests.get(url)\n",
    "    length = len(rq.content)\n",
    "    print(url, length)\n",
    "    return length\n",
    "\n",
    "start = time()\n",
    "\n",
    "pool = concurrent.futures.ProcessPoolExecutor(max_workers=6)\n",
    "\n",
    "list_result = list(pool.map(get_url_size, list_url))\n",
    "       \n",
    "end = time()\n",
    "print('\\ncost time {:f} s'.format(end - start))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "你会发现和前面 cpu 密集运算的例子不同，使用多进程方式并没有提高太多，慢的网站你给它一个单独核心，还是要等待，多线程切换时候，这些等待的时候我们就已经可以先抓其他了。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 84,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "http://www.qq.com 246819\n",
      "http://taobao.com 123918\n",
      "http://mingrihui.com 43480\n",
      "http://chuangyiji.com 84537\n",
      "\n",
      "cost time 4.587851 s\n"
     ]
    }
   ],
   "source": [
    "# 多进程方式执行网站大小抓取\n",
    "# executor 写法\n",
    "from time import time\n",
    "import requests\n",
    "import concurrent.futures\n",
    "\n",
    "list_url = ['http://www.qq.com', 'http://chuangyiji.com', 'http://taobao.com', 'http://mingrihui.com']\n",
    "\n",
    "def get_url_size(url):\n",
    "    rq = requests.get(url)\n",
    "    length = len(rq.content)\n",
    "    print(url, length)\n",
    "    return length\n",
    "\n",
    "start = time()\n",
    "\n",
    "with concurrent.futures.ProcessPoolExecutor(max_workers=6) as executor:\n",
    "\n",
    "    # 关键是 submit 方法\n",
    "    future = {executor.submit(get_url_size, url): url for url in list_url}\n",
    "       \n",
    "end = time()\n",
    "print('\\ncost time {:f} s'.format(end - start))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "concurrent.futures 模块提供了高级的接口对于异步方式进行执行调用，异步执行可以通过线程池，或者独立的进程池。通过抽象的 Executor 类对于两种调用方式有一致的接口。这样对于我们来说，不管是多线程还是多进程，在代码层面都可以方便的切换。\n",
    "\n",
    "下面的写法是参照了 python 3 的官方文档，通过线程池来实现多线程的抓取，所以我把网站 url 增加到8个，通过6个 worker 的线程池来抓取。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 85,
   "metadata": {
    "collapsed": false,
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "'http://www.sohu.com' page is 184564 bytes\n",
      "'http://www.baidu.com' page is 2381 bytes\n",
      "'http://www.qq.com' page is 246819 bytes\n",
      "'http://mingrihui.com' page is 43480 bytes\n",
      "'http://www.163.com' page is 660241 bytes\n",
      "'http://www.sina.com.cn' page is 601949 bytes\n",
      "'http://chuangyiji.com' page is 84537 bytes\n",
      "'http://taobao.com' page is 123986 bytes\n",
      "\n",
      "cost time 12.424974 s\n"
     ]
    }
   ],
   "source": [
    "# 执行网站大小抓取\n",
    "# 使用线程池方式\n",
    "\n",
    "from time import time\n",
    "import concurrent.futures\n",
    "import requests\n",
    "\n",
    "list_url = ['http://www.qq.com', \n",
    "            'http://chuangyiji.com', \n",
    "            'http://taobao.com',\n",
    "            'http://www.sohu.com',\n",
    "            'http://www.163.com',\n",
    "            'http://www.sina.com.cn',\n",
    "            'http://www.baidu.com',\n",
    "            'http://mingrihui.com']\n",
    "\n",
    "def get_url_size(url):\n",
    "    rq = requests.get(url)\n",
    "    length = len(rq.content)\n",
    "    return length\n",
    "\n",
    "start = time()\n",
    "\n",
    "# 设置了线程池中 worker\n",
    "with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:\n",
    "    future_to_url = {executor.submit(get_url_size, url): url for url in list_url}\n",
    "    for future in concurrent.futures.as_completed(future_to_url):\n",
    "        url = future_to_url[future]\n",
    "        try:\n",
    "            data = future.result()\n",
    "        except Exception as exc:\n",
    "            print('%r generated an exception: %s' % (url, exc))\n",
    "        else:\n",
    "            print('%r page is %d bytes' % (url, data))\n",
    "       \n",
    "end = time()\n",
    "print('\\ncost time {:f} s'.format(end - start))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "#### 小结\n",
    "\n",
    "Python 的线程在 GIL 的控制之下，线程之间，对整个 Python 解释器，对 Python 提供的 CAPI 的访问，都是互斥的，这可以看做 Python 内核级的互斥机制，这种互斥是我们不能控制的，这样就保护了共享资源。\n",
    "\n",
    "Python 语言的确是比较讲究简洁以及人类化，有些编程语言的设计为了性能或者独特，使得学习曲线比较陡峭，Python 的解释器有了 GIL 之后会容易实现一些，当然单价就是性能会有影响。（很长一段时间内，Python 没有那么流行就是因为性能这个锅一直背着，但是随着服务器性能最近几年的直线上升，Python 性能在大多数应用场景下已经不是问题了。）\n",
    "\n",
    "在传统进程、线程之后协程的概念继续发展，异步的操作也使得诸如 sanic 这样的 web 框架在性能指标上超过了这几年在整个领域非常厉害的 go 语言，也将 flask 等“传统”的 web 框架甩开几乎数量级的差距。我们之后继续会探讨 python 3 的协程和异步这些。\n",
    "\n",
    "#### 参考资料\n",
    "\n",
    "* https://zh.wikipedia.org/wiki/%E7%BA%BF%E7%A8%8B\n",
    "* https://jeffknupp.com/blog/2012/03/31/pythons-hardest-problem/ \n",
    "* https://www.oschina.net/translate/pythons-hardest-problem\n",
    "* http://www.dabeaz.com/python/UnderstandingGIL.pdf\n",
    "* http://www.jianshu.com/p/4097b7a5a1bf"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
